bd86a12fb0145a4de25c440972865e89a3023ec8,cdap-data-fabric/src/main/java/co/cask/cdap/data/stream/service/DistributedStreamService.java,DistributedStreamService,aggregate,#Set#,203
Before Change
StreamConfig config = streamAdmin.getConfig(streamName);
long filesSize = StreamUtils.fetchStreamFilesSize(config);
LOG.debug("Size of the files already present for stream {}: {}", streamName, filesSize);
createSizeAggregator(streamName, filesSize, new AtomicInteger(config.getNotificationThresholdMB()));
} catch (IOException e) {
LOG.error("Could not compute sizes of files for stream {}", streamName);
Throwables.propagate(e);
After Change
StreamConfig config = streamAdmin.getConfig(streamName);
long filesSize = StreamUtils.fetchStreamFilesSize(config);
LOG.debug("Size of the files already present for stream {}: {}", streamName, filesSize);
createSizeAggregator(streamName, filesSize, config.getNotificationThresholdMB());
} catch (IOException e) {
LOG.error("Could not compute sizes of files for stream {}", streamName);
Throwables.propagate(e);